package standalone

import (
	"github.com/DiracLee/dires-go/app/cmdline"
	"github.com/DiracLee/dires-go/app/config"
	"github.com/DiracLee/dires-go/app/connection"
	"github.com/DiracLee/dires-go/app/handler"
	"github.com/DiracLee/dires-go/app/payload"
	"github.com/DiracLee/dires-go/logger"
	"github.com/DiracLee/dires-go/tcp/parser"
	"io"
	"os"
	"strconv"
	"sync"
)

const (
	aofQueueSize = 1 << 16
)

type AOFHandler struct {
	aofChan     chan *record
	aofFile     *os.File
	aofFileName string
	aofFinished chan struct{}
	svr         *Standalone
	tmpDBMaker  func() *Standalone
	currentDB   int
	pausingAof  sync.RWMutex
}

func NewAOFHandler(svr *Standalone, tmpDBMaker func() *Standalone) (*AOFHandler, error) {
	h := &AOFHandler{}
	h.aofFileName = config.GetAppendFile()
	h.svr = svr
	h.tmpDBMaker = tmpDBMaker
	h.LoadAof(0)
	aofFile, err := os.OpenFile(h.aofFileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
	if err != nil {
		return nil, err
	}
	h.aofFile = aofFile
	h.aofChan = make(chan *record, aofQueueSize)
	h.aofFinished = make(chan struct{})
	go func() {
		h.handleAOF()
	}()
	return h, nil
}

// LoadAof read aof file
func (h *AOFHandler) LoadAof(maxBytes int) {
	// delete aofChan to prevent write again
	aofChan := h.aofChan
	h.aofChan = nil
	defer func(aofChan chan *record) {
		h.aofChan = aofChan
	}(aofChan)

	file, err := os.Open(h.aofFileName)
	if err != nil {
		if _, ok := err.(*os.PathError); ok {
			return
		}
		logger.Warn(err)
		return
	}
	defer func() {
		_ = file.Close()
	}()

	var reader io.Reader
	if maxBytes > 0 {
		reader = io.LimitReader(file, int64(maxBytes))
	} else {
		reader = file
	}
	ch := parser.ParseStream(reader)
	fakeConn := &connection.MockConnection{} // only used for save dbIndex
	for result := range ch {
		if result.Err != nil {
			if result.Err == io.EOF {
				break
			}
			logger.Error("parse error: " + result.Err.Error())
			continue
		}
		if result.Data == nil {
			logger.Error("empty record")
			continue
		}
		r, ok := result.Data.(*payload.MultiBulkPayload)
		if !ok {
			logger.Error("require multi bulk payload")
			continue
		}
		ret := h.svr.Execute(fakeConn, r.Args)
		if payload.IsErrorPayload(ret) {
			logger.Error("exec err: ", ret)
		}
	}
}

type record struct {
	cmdLine [][]byte
	dbIndex int
}

func (h *AOFHandler) Rewrite() {
	// TODO
}

// AddAof send command to aof goroutine through channel
func (h *AOFHandler) AddAof(dbIndex int, cmdLine [][]byte) {
	if config.GetAppendOnly() && h.aofChan != nil {
		h.aofChan <- &record{
			cmdLine: cmdLine,
			dbIndex: dbIndex,
		}
	}
}

// Close gracefully stops aof persistence procedure
func (h *AOFHandler) Close() {
	if h.aofFile != nil {
		close(h.aofChan)
		<-h.aofFinished // wait for aof finished
		err := h.aofFile.Close()
		if err != nil {
			logger.Warn(err)
		}
	}
}

func (h *AOFHandler) handleAOF() {
	// serialized execution
	h.currentDB = 0
	for p := range h.aofChan {
		h.pausingAof.RLock() // prevent other goroutines from pausing aof
		if p.dbIndex != h.currentDB {
			// select database
			data := payload.NewMultiBulkPayload(handler.StringsCommand("SELECT", strconv.Itoa(p.dbIndex))).Bytes()
			_, err := h.aofFile.Write(data)
			if err != nil {
				logger.Warn(err)
				continue // skip this command
			}
			h.currentDB = p.dbIndex
		}
		data := payload.NewMultiBulkPayload(p.cmdLine).Bytes()
		_, err := h.aofFile.Write(data)
		if err != nil {
			logger.Warn(err)
		}
		h.pausingAof.RUnlock()
	}
	h.aofFinished <- struct{}{}
}

// handleBGRewriteAOF asynchronously rewrites Append-Only-File
func handleBGRewriteAOF(svr *Standalone, conn connection.Connection, args cmdline.CmdLine) payload.Payload {
	go svr.aofHandler.Rewrite()
	return payload.NewStatusPayload("Background append only file rewriting started")
}

// handleRewriteAOF start Append-Only-File rewriting and blocked until it finished
func handleRewriteAOF(svr *Standalone, conn connection.Connection, args cmdline.CmdLine) payload.Payload {
	svr.aofHandler.Rewrite()
	return payload.NewStatusPayload("Background append only file rewriting started")
}
